开发 Operator 调度 GPU 实例资源池 原创
✍ 道路千万条,安全第一条。操作不规范,运维两行泪。
最近在学习《AIOps》相关的知识课程,为了让学习有一定的收获,所以将其进行了总结分享,如果你恰好也需要,很荣幸能帮到你。
前面我们介绍了《开发K8s Chat 命令行工具》和《开发 K8s GPT 故障诊断工具》两篇和 K8s 相关的文章,本篇文章我们将把 K8s、AI、云 三者结合起来,开发一个 AI 工具。
本章节将引入一个新的概念——K8s Operator,它是 K8s 的一种扩展形式,可以帮助用户以 K8s 声明式 API 的方式管理应用及服务,Operator 定义了一组在 Kubernetes 集群中打包和部署复杂业务应用的方法,主要是为解决特定应用或服务关于如何运行、部署及出现问题时如何处理提供的一种特定的自定义方式。比如:
- 按需部署应用服务
- 实现应用状态的备份和还原,完成版本升级
- 数据库 schema 或额外的配置设置的改动
在 K8s 中我们使用的 Deployment、Daemonset、Statefulset 等这些都是 K8s 的资源,这些资源的创建、删除、更新等动作都会被称为事件,K8s 的 Controller Manager 负责事件的监听,并触发对应的动作来满足期望,这种方式就是声明式,即用户只需要关心应用程序的最终状态。当我们在使用中发现有些资源并不能满足日常的需求,对于这类需求可以使用 K8s 的自定义资源和 Operator 为应用程序提供基于 K8s 的扩展。
在这其中,CRD 就是对自定义资源的描述,如果要自定义资源,就需要先定义好 CRD,也就是介绍这个资源有什么属性,这些属性的类型、结构是怎样的。
比如 PG 的 Operator 如下:
apiVersion: apiextensions.k8s.io/v1beta1
kind: CustomResourceDefinition
metadata:
name: postgresqls.acid.zalan.do
labels:
app.kubernetes.io/name: postgres-operator
annotations:
"helm.sh/hook": crd-install
spec:
group: acid.zalan.do
names:
kind: postgresql
listKind: postgresqlList
plural: postgresqls
singular: postgresql
shortNames:
- pg additionalPrinterColumns:
- name: Team
type: string
description: Team responsible for Postgres CLuster
JSONPath: .spec.teamId
- name: Version
type: string
description: PostgreSQL version
JSONPath: .spec.postgresql.version
- name: Pods
type: integer
description: Number of Pods per Postgres cluster
JSONPath: .spec.numberOfInstances
- name: Volume
type: string
description: Size of the bound volume
JSONPath: .spec.volume.size
CRD 主要包括 apiVersion、kind、metadata 和 spec 四个部分。其中最关键的是 apiVersion 和 kind,apiVersion 表示资源所属组织和版本,apiVersion 一般由 APIGourp 和 Version 组成,这里的 APIGourp 是http://apiextensions.k8s.io,Version 是 v1beta1,相关信息可以通过kubectl api-resoures查看。kind 表示资源类型,这里是CustomResourceDefinition,表示是一个自定义的资源描述。
本文我们将自己开发一个 Operator 来维护 GPU 资源池的稳定,解决 AI 模型训练的基础平台的稳定性。其架构如下:
其中:
- GPU 资源池采用的是腾讯云的竞价 GPU 实例
- Operator 运行在 K8s 中,通过 SpootPool 控制 GPU 资源池的数量
- 若云平台释放了某台 GPU 实例,当 Operator 监听到资源池数量和期望的不匹配,会自动补充到期望数量
Operator 的开发有多种脚手架,常用的有 operator-sdk、kubebuilder 等,这里我们将使用 kubebuilder 来完成 Operator 的开发。
前置条件
- 准备一个可用的 K8s 集群,可以使用 kind、kubeadm、二进制等各种形式安装,如果使用 kubeadm 安装集群,可以参考 Kubernetes集群管理。
- 安装好 kubebuilder,可以参考 kubebuild快速安装。
- 准备好云平台的 AK,这里是采用腾讯云,其他云类似。
快速开始
1、设计 CRD
在开发之前需要先设计好 CRD(就像业务开发前先设计好表结构一样),本文的 CRD 主要包含云平台虚拟机的开通,包括最小和最大实例数,以及腾讯云 SDK 所需要的各种参数,比如地域、可用区、VPC、子网、安全组、镜像等。
最后 CRD 设计如下:
apiVersion: devops.jokerbai.com/v1
kind: Spotpool
metadata:
labels:
app.kubernetes.io/name: spotpool
app.kubernetes.io/managed-by: kustomize
name: spotpool-sample
spec:
secretId: 密钥ID
secretKey: 密钥Key
region: 区域
availabilityZone: 可用区
instanceType: 实例类型
minimum: 最小实例数
maximum: 最大实例数
subnetId: 子网ID
vpcId: VPC ID
securityGroupIds:
- 安全组
imageId: 镜像ID
instanceChargeType: 实例付费类型
2、初始化项目
定义好 CRD 字段之后,我们先使用 kubebuilder 初始化一个 Operator 项目,命令如下:
(1)初始化项目
mkdir spotpool && cd spotpool
kubebuilder init \
--domain jokerbai.com \
--repo github.com/joker-bai/spotpool \
--project-name spotpool \
--plugins go/v4 \
--owner "Joker Bai"
(2)创建 API
kubebuilder create api --group devops.jokerbai.com --version v1 --kind Spotpool
(3)生成后的目录结构大致如下
.
├── api
│ └── v1
│ ├── groupversion_info.go
│ ├── spotpool_types.go
│ └── zz_generated.deepcopy.go
├── bin
│ ├── controller-gen -> /root/workspace/godev/src/github.com/joker-bai/spotpool/bin/controller-gen-v0.18.0
│ └── controller-gen-v0.18.0
├── cmd
│ └── main.go
├── config
│ ├── crd
│ │ ├── kustomization.yaml
│ │ └── kustomizeconfig.yaml
│ ├── default
│ │ ├── cert_metrics_manager_patch.yaml
│ │ ├── kustomization.yaml
│ │ ├── manager_metrics_patch.yaml
│ │ └── metrics_service.yaml
│ ├── manager
│ │ ├── kustomization.yaml
│ │ └── manager.yaml
│ ├── network-policy
│ │ ├── allow-metrics-traffic.yaml
│ │ └── kustomization.yaml
│ ├── prometheus
│ │ ├── kustomization.yaml
│ │ ├── monitor_tls_patch.yaml
│ │ └── monitor.yaml
│ ├── rbac
│ │ ├── kustomization.yaml
│ │ ├── leader_election_role_binding.yaml
│ │ ├── leader_election_role.yaml
│ │ ├── metrics_auth_role_binding.yaml
│ │ ├── metrics_auth_role.yaml
│ │ ├── metrics_reader_role.yaml
│ │ ├── role_binding.yaml
│ │ ├── role.yaml
│ │ ├── service_account.yaml
│ │ ├── spotpool_admin_role.yaml
│ │ ├── spotpool_editor_role.yaml
│ │ └── spotpool_viewer_role.yaml
│ └── samples
│ ├── devops.jokerbai.com_v1_spotpool.yaml
│ └── kustomization.yaml
├── Dockerfile
├── go.mod
├── go.sum
├── hack
│ └── boilerplate.go.txt
├── internal
│ └── controller
│ ├── spotpool_controller.go
│ ├── spotpool_controller_test.go
│ └── suite_test.go
├── Makefile
├── PROJECT
├── README.md
└── test
├── e2e
│ ├── e2e_suite_test.go
│ └── e2e_test.go
└── utils
└── utils.go
3、CRD 开发
(1)定义 API
在api/v1alpha1/spotpool_types.go
中定义 CRD 的结构体,如下:
package v1
import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
// EDIT THIS FILE! THIS IS SCAFFOLDING FOR YOU TO OWN!
// NOTE: json tags are required. Any new fields you add must have json tags for the fields to be serialized.
// SpotpoolSpec defines the desired state of Spotpool
type SpotpoolSpec struct {
// INSERT ADDITIONAL SPEC FIELDS - desired state of cluster
// Important: Run "make" to regenerate code after modifying this file
SecretId string `json:"secretId,omitempty"`
SecretKey string `json:"secretKey,omitempty"`
Region string `json:"region,omitempty"`
AvaliableZone string `json:"availabilityZone,omitempty"`
InstanceType string `json:"instanceType,omitempty"`
SubnetId string `json:"subnetId,omitempty"`
VpcId string `json:"vpcId,omitempty"`
SecurityGroupId []string `json:"securityGroupIds,omitempty"`
ImageId string `json:"imageId,omitempty"`
InstanceChargeType string `json:"instanceChargeType,omitempty"`
Minimum int32 `json:"minimum,omitempty"`
Maximum int32 `json:"maximum,omitempty"`
}
// SpotpoolStatus defines the observed state of Spotpool
type SpotpoolStatus struct {
// INSERT ADDITIONAL STATUS FIELD - define observed state of cluster
// Important: Run "make" to regenerate code after modifying this file
Size int32 `json:"size,omitempty"`
Conditions []metav1.Condition `json:"conditions,omitempty" patchStrategy:"merge" patchMergeKey:"type" protobuf:"bytes,rep,name=conditions"`
Instances []Instances `json:"instances,omitempty"`
}
type Instances struct {
InstanceId string `json:"instanceId,omitempty"`
PublicIp string `json:"publicIp,omitempty"`
}
//+kubebuilder:object:root=true
//+kubebuilder:subresource:status
// Spotpool is the Schema for the spotpools API
type Spotpool struct {
metav1.TypeMeta `json:",inline"`
metav1.ObjectMeta `json:"metadata,omitempty"`
Spec SpotpoolSpec `json:"spec,omitempty"`
Status SpotpoolStatus `json:"status,omitempty"`
}
//+kubebuilder:object:root=true
// SpotpoolList contains a list of Spotpool
type SpotpoolList struct {
metav1.TypeMeta `json:",inline"`
metav1.ListMeta `json:"metadata,omitempty"`
Items []Spotpool `json:"items"`
}
func init() {
SchemeBuilder.Register(&Spotpool{}, &SpotpoolList{})
}
在 SpotpoolSpec
中定义设计的 CRD 结构体,这些字段都是创建虚拟机的必要字段。另外,在 SpotpoolStatus
中定义返回状态里的信息,这里只需要 Instance 相关的信息。
(2)生成代码
API 相关的代码开发完后,执行以下命令生成代码:
make generate
make manifests
4、Controller 开发
(1)开发控制器逻辑
控制器的主逻辑是:
- 从云平台获取运行的实例数
- 判断实例数和期望的实例数是否相等
- 如果小于期望值,则创建实例
- 如果大于期望值,则删除实例
所以主逻辑的代码如下,修改internal/controller/spotpool_controller.go
:
func (r *SpotpoolReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
log := logf.FromContext(ctx)
// 获取用户期望
spotpool := &devopsjokerbaicomv1.Spotpool{}
if err := r.Get(ctx, req.NamespacedName, spotpool); err != nil {
log.Error(err, "unable to fetch spotspool")
}
// 从云平台获取获取运行的实例
runningVmList, err := r.getRunningInstanceIds(spotpool)
if err != nil {
log.Error(err, "get running vm instance failed")
// 十秒后重试
return ctrl.Result{RequeueAfter: 10 * time.Second}, nil
}
runningCount := len(runningVmList)
switch {
case runningCount < int(spotpool.Spec.Minimum):
// 创建实例扩容
delta := spotpool.Spec.Minimum - int32(runningCount)
log.Info("creating instances", "delta", delta)
err = r.runInstances(spotpool, delta)
if err != nil {
log.Error(err, "unable to create instances")
return ctrl.Result{RequeueAfter: 40 * time.Second}, nil
}
case runningCount > int(spotpool.Spec.Maximum):
// 删除实例缩容
delta := int32(runningCount) - spotpool.Spec.Maximum
log.Info("terminating instances", "delta", delta)
err = r.terminateInstances(spotpool, delta)
if err != nil {
log.Error(err, "unable to terminate instances")
return ctrl.Result{RequeueAfter: 40 * time.Second}, nil
}
}
return ctrl.Result{RequeueAfter: 40 * time.Second}, nil
}
其中:
r.getRunningInstanceIds(spotpool)
用户获取云平台运行的实例数r.runInstances(spotpool, delta)
用于调用云平台进行扩容r.terminateInstances(spotpool, delta)
用于调用云平台进行缩容
接下来分别实现上面的三个方法。
(1)首先,实现 getRunningInstanceIds
方法
func (r *SpotpoolReconciler) getRunningInstanceIds(spotpool *devopsjokerbaicomv1.Spotpool) ([]string, error) {
client, err := r.createCVMClient(spotpool.Spec)
if err != nil {
return nil, err
}
request := cvm.NewDescribeInstancesRequest()
response, err := client.DescribeInstances(request)
if err != nil {
return nil, err
}
var instances []devopsjokerbaicomv1.Instances
var runningInstanceIDs []string
for _, instance := range response.Response.InstanceSet {
if *instance.InstanceState == "RUNNING" || *instance.InstanceState == "PENDING" || *instance.InstanceState == "STARTING" {
runningInstanceIDs = append(runningInstanceIDs, *instance.InstanceId)
}
// 检查实例的公网 IP,如果不存在公网 IP,则继续重试
if len(instance.PublicIpAddresses) == 0 {
return nil, fmt.Errorf("instance %s does not have public ip", *instance.InstanceId)
}
instances = append(instances, devopsjokerbaicomv1.Instances{
InstanceId: *instance.InstanceId,
PublicIp: *instance.PublicIpAddresses[0],
})
}
// 更新 status
spotpool.Status.Instances = instances
err = r.Status().Update(context.Background(), spotpool)
if err != nil {
return nil, err
}
return runningInstanceIDs, nil
}
// 获取腾讯云 SDK client
func (r *SpotpoolReconciler) createCVMClient(spec devopsjokerbaicomv1.SpotpoolSpec) (*cvm.Client, error) {
credential := common.NewCredential(spec.SecretId, spec.SecretKey)
cpf := profile.NewClientProfile()
cpf.HttpProfile.ReqMethod = "POST"
cpf.HttpProfile.ReqTimeout = 30
cpf.SignMethod = "HmacSHA1"
client, err := cvm.NewClient(credential, spec.Region, cpf)
if err != nil {
return nil, err
}
return client, nil
}
其中:
- 调用
r.createCVMClient(spotpool.Spec)
获取腾讯云SDK client - 然后调用
client.DescribeInstances(request)
获取实例详细信息 - 最后通过判断
instance.InstanceStat
和instance.PublicIpAddresses
的状态信息决定是否是需要的实例 - 最后返回实例列表信息
(2)实现 r.runInstances(spotpool, delta)
用于调用云平台进行扩容
func (r *SpotpoolReconciler) runInstances(spotpool *devopsjokerbaicomv1.Spotpool, count int32) error {
client, err := r.createCVMClient(spotpool.Spec)
if err != nil {
return err
}
request := cvm.NewRunInstancesRequest()
request.ImageId = common.StringPtr(spotpool.Spec.ImageId)
request.Placement = &cvm.Placement{
Zone: common.StringPtr(spotpool.Spec.AvaliableZone),
}
request.InstanceChargeType = common.StringPtr(spotpool.Spec.InstanceChargeType)
request.InstanceCount = common.Int64Ptr(int64(count))
request.InstanceName = common.StringPtr("spotpool" + time.Now().Format("20060102150405"))
request.InstanceType = common.StringPtr(spotpool.Spec.InstanceType)
request.InternetAccessible = &cvm.InternetAccessible{
InternetChargeType: common.StringPtr("BANDWIDTH_POSTPAID_BY_HOUR"),
InternetMaxBandwidthOut: common.Int64Ptr(1),
PublicIpAssigned: common.BoolPtr(true),
}
request.LoginSettings = &cvm.LoginSettings{
Password: common.StringPtr("Password123"),
}
request.SecurityGroupIds = common.StringPtrs(spotpool.Spec.SecurityGroupId)
request.SystemDisk = &cvm.SystemDisk{
DiskType: common.StringPtr("CLOUD_BSSD"),
DiskSize: common.Int64Ptr(100),
}
request.VirtualPrivateCloud = &cvm.VirtualPrivateCloud{
SubnetId: common.StringPtr(spotpool.Spec.SubnetId),
VpcId: common.StringPtr(spotpool.Spec.VpcId),
}
// print request
fmt.Println(request.ToJsonString())
// 创建实例
response, err := client.RunInstances(request)
if _, ok := err.(*errors.TencentCloudSDKError); ok {
return err
}
// other errors
if err != nil {
return err
}
// 获取到返回的 instancesid
instanceIds := make([]string, 0, len(response.Response.InstanceIdSet))
for _, instanceId := range response.Response.InstanceIdSet {
instanceIds = append(instanceIds, *instanceId)
}
fmt.Println("run instances success", instanceIds)
// 更新 status
_, err = r.getRunningInstanceIds(spotpool)
if err != nil {
return err
}
return nil
}
这个方法主要是调用 client.RunInstances(request)
进行实例创建,然后调用 r.getRunningInstanceIds(spotpool)
更新 status
的状态信息。
(3)开发r.terminateInstances(spotpool, delta)
用于调用云平台进行缩容
func (r *SpotpoolReconciler) terminateInstances(spotpool *devopsjokerbaicomv1.Spotpool, count int32) error {
client, err := r.createCVMClient(spotpool.Spec)
if err != nil {
return err
}
runningInstances, err := r.getRunningInstanceIds(spotpool)
if err != nil {
return err
}
instancesIds := runningInstances[:count]
request := cvm.NewTerminateInstancesRequest()
request.InstanceIds = common.StringPtrs(instancesIds)
// 获取返回
response, err := client.TerminateInstances(request)
if _, ok := err.(*errors.TencentCloudSDKError); ok {
return err
}
// other errors
if err != nil {
return err
}
fmt.Println("Terminate response: ", response)
fmt.Println("terminate instances success", instancesIds)
// 更新 status
_, err = r.getRunningInstanceIds(spotpool)
if err != nil {
return err
}
return nil
}
删除实例和创建实例的实现逻辑类似,先调用 client.TerminateInstances(request)
进行删除,然后调用 r.getRunningInstanceIds(spotpool)
更新状态。
上面三个步骤完成了主要逻辑开发,可以初步实现具体的效果,如果希望功能更健全,则需要对其进行开发优化。
部署和测试
1、本地测试
# 安装 CRD
make install
# 运行 controller
make run
2、创建 Spotpool 实例测试
(1)创建 Spotpool 资源清单,编辑 config/samples/devops.jokerbai.com_v1_spotpool.yaml
apiVersion: devops.jokerbai.com.jokerbai.com/v1
kind: Spotpool
metadata:
labels:
app.kubernetes.io/name: spotpool
app.kubernetes.io/managed-by: kustomize
name: spotpool-sample
spec:
secretId: xxx
secretKey: xxx
region: ap-singapore
availabilityZone: ap-singapore-2
instanceType: "GN7.2XLARGE32"
minimum: 2
maximum: 2
subnetId: DEFAULT
vpcId: DEFAULT
securityGroupIds:
- sg-xxx
imageId: img-xxx
instanceChargeType: SPOTPAID
(2)运行资源清单
# 创建实例
kubectl apply -f config/samples/devops.jokerbai.com_v1_spotpool.yaml
# 查看状态
kubectl get spotpool
(3)构建并部署到集群
# 构建镜像
make docker-build docker-push IMG=<your-registry>/spotpool:v1
# 部署到集群
make deploy IMG=<your-registry>/spotpool:v1
(4)清理
# 删除 operator
make undeploy
# 删除 CRD
make uninstall
最后
本文通过结合 Kubernetes、AI 和云平台,深入探讨了如何利用 K8s Operator 实现对 GPU 资源池的自动化管理。我们从 Operator 的核心概念出发,介绍了 CRD(自定义资源定义)和控制器的设计原理,并基于 kubebuilder 开发了一个名为 Spotpool 的 Operator,用于在腾讯云上维护竞价实例的稳定运行。
整个开发过程遵循“声明式 API”的思想,用户只需定义期望的状态(如最小/最大实例数),Operator 便会在后台持续监控并自动调整实际状态,确保资源池始终符合预期。这不仅极大地简化了运维操作,也提升了 AI 模型训练平台的稳定性和弹性。
Operator 是云原生时代自动化运维的重要利器。掌握其开发方法,意味着我们不仅能“用好” Kubernetes,更能“扩展” Kubernetes,为复杂业务场景提供定制化的解决方案。